import org.apache.flink.util.Collector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;

import java.util.HashSet;
import java.util.Set;

class MyCoGrouper
        implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {

    @Override
    public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
                        Iterable<Tuple2<String, Double>> dVals,
                        Collector<Double> out) {

        Set<Integer> ints = new HashSet<Integer>();

        // add all Integer values in group to set
        for (Tuple2<String, Integer> val : iVals)
        {
            System.out.println("---iVals-：" + val);
            ints.add(val.f1);
        }
        System.out.println("----：----------------------");
        // multiply each Double value with each unique Integer values of group
        for (Tuple2<String, Double> val : dVals) {
            for (Integer i : ints) {
                System.out.println("---dVals-：i-----"+ val.f0 +"------" + val.f1 + "-----"+ i + "------"+ val.f1 * i );
                out.collect(val.f1 * i);
            }
        }
    }
}